Skip to content

LangGraph Interrupts 集成方案

一、模块概述

属性说明
模块名称Interrupts(中断/人机协作)
优先级🟠 P1(中高)
预估工时1-2 天
依赖项langgraph.types, Checkpointer

为什么需要

某些敏感操作需要人工确认后才能执行,提高系统安全性和可控性:

  • 敏感操作审批(发送邮件、删除数据、执行支付)
  • 内容审核(AI 生成内容需要确认后发布)
  • 信息确认(关键信息需要用户确认)

二、架构设计

2.1 人机协作流程

┌──────────────────────────────────────────────────────────────────┐
│                    Interrupts 人机协作流程                          │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│   用户请求 ──► Agent 处理 ──► 检测敏感操作                         │
│                                    │                              │
│                                    ▼                              │
│                            ┌──────────────┐                      │
│                            │   interrupt   │                      │
│                            │   (暂停执行)   │                      │
│                            └──────┬───────┘                      │
│                                   │                              │
│                                   ▼                              │
│   ┌─────────────────────────────────────────────────────────┐  │
│   │                     返回前端等待确认                        │  │
│   │           __interrupts__ = [{action, details, message}]    │  │
│   └─────────────────────────────────────────────────────────┘  │
│                                   │                              │
│                    ┌──────────────┴──────────────┐              │
│                    │                              │              │
│                    ▼                              ▼              │
│            ┌──────────────┐            ┌──────────────┐        │
│            │   用户批准    │            │   用户拒绝    │        │
│            │ resume=True  │            │ resume=False │        │
│            └──────┬───────┘            └──────┬───────┘        │
│                   │                              │              │
│                   ▼                              ▼              │
│            ┌──────────────┐            ┌──────────────┐        │
│            │   执行操作    │            │   取消操作    │        │
│            └──────────────┘            └──────────────┘        │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

2.2 状态设计

python
from typing import TypedDict, Optional, List, Any
from langgraph.graph import MessagesState

class HumanApprovalState(MessagesState):
    """支持人机审批的状态"""
    # 继承 messages 字段
    pending_action: Optional[dict]  # 待审批的操作
    approved: Optional[bool]       # 审批结果
    action_result: Optional[str]   # 操作执行结果

三、代码实现

3.1 中断工具定义

创建文件: services/tools/approval.py

python
"""需要人工审批的工具

这些工具在执行敏感操作前会中断,等待用户确认。
"""
from typing import Optional
from langchain.tools import tool
import logging

logger = logging.getLogger(__name__)


@tool
def send_email(to: str, subject: str, body: str) -> dict:
    """
    发送邮件。

    这是一个需要人工审批的操作。在执行前会暂停,等待用户确认。

    Args:
        to: 收件人邮箱地址
        subject: 邮件主题
        body: 邮件正文

    Returns:
        包含操作信息的字典(实际发送在审批后执行)
    """
    # 返回操作信息,实际执行在审批后
    return {
        "action": "send_email",
        "requires_approval": True,
        "details": {
            "to": to,
            "subject": subject,
            "body": body[:100] + "..." if len(body) > 100 else body
        }
    }


@tool
def delete_conversation(conversation_id: str, confirm: bool = False) -> dict:
    """
    删除对话记录。

    这是一个需要人工审批的敏感操作。

    Args:
        conversation_id: 要删除的对话 ID
        confirm: 确认删除(需要用户确认后设为 True)

    Returns:
        包含操作信息的字典
    """
    return {
        "action": "delete_conversation",
        "requires_approval": True,
        "details": {
            "conversation_id": conversation_id
        }
    }


@tool
def update_user_info(field: str, new_value: str) -> dict:
    """
    更新用户信息。

    修改用户个人信息需要用户确认。

    Args:
        field: 要修改的字段名(如 nickname, email)
        new_value: 新的值

    Returns:
        包含操作信息的字典
    """
    return {
        "action": "update_user_info",
        "requires_approval": True,
        "details": {
            "field": field,
            "new_value": new_value
        }
    }


# 需要审批的工具列表
APPROVAL_REQUIRED_TOOLS = [send_email, delete_conversation, update_user_info]

3.2 带中断的 Agent 服务

创建文件: services/langgraph_approval.py

python
"""带人机审批的 LangGraph Agent

使用 interrupt 实现敏感操作的人工确认。
"""
import os
import logging
from typing import Optional, Dict, Any, Literal
from dataclasses import dataclass, field
from dotenv import load_dotenv

from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.types import interrupt, Command
from langgraph.prebuilt import ToolNode

from services.tools import ALL_TOOLS
from services.tools.approval import APPROVAL_REQUIRED_TOOLS

load_dotenv(override=True)

logger = logging.getLogger(__name__)


@dataclass
class ApprovalConfig:
    """审批配置"""
    api_key: str = field(default_factory=lambda: os.getenv("OPENROUTER_API_KEY", ""))
    base_url: str = field(default_factory=lambda: os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1"))
    default_model: str = field(default_factory=lambda: os.getenv("OPENROUTER_MODEL", "openai/gpt-4o"))


class ApprovalEnabledAgent:
    """带人机审批的 Agent"""

    # 需要审批的操作类型
    APPROVAL_ACTIONS = {"send_email", "delete_conversation", "update_user_info"}

    def __init__(self, config: Optional[ApprovalConfig] = None):
        self.config = config or ApprovalConfig()
        self.all_tools = ALL_TOOLS + APPROVAL_REQUIRED_TOOLS

    def _get_llm(self, model: Optional[str] = None, temperature: float = 0.7) -> ChatOpenAI:
        """获取绑定了工具的 LLM"""
        return ChatOpenAI(
            model=model or self.config.default_model,
            api_key=self.config.api_key,
            base_url=self.config.base_url,
            temperature=temperature
        ).bind_tools(self.all_tools)

    def _should_continue(self, state: MessagesState) -> Literal["tools", "approval", END]:
        """判断下一步操作"""
        last_message = state["messages"][-1]

        if not last_message.tool_calls:
            return END

        # 检查是否有需要审批的工具
        for tool_call in last_message.tool_calls:
            if tool_call["name"] in self.APPROVAL_ACTIONS:
                return "approval"

        return "tools"

    def _approval_node(self, state: MessagesState) -> Command[Literal["tools", "agent"]]:
        """审批节点 - 中断等待用户确认"""
        last_message = state["messages"][-1]
        tool_calls = last_message.tool_calls

        # 收集所有需要审批的操作
        approval_requests = []
        for tool_call in tool_calls:
            if tool_call["name"] in self.APPROVAL_ACTIONS:
                approval_requests.append({
                    "tool_call_id": tool_call["id"],
                    "action": tool_call["name"],
                    "args": tool_call["args"]
                })

        # 中断等待用户确认
        decision = interrupt({
            "type": "approval_required",
            "message": "以下操作需要您的确认",
            "requests": approval_requests
        })

        # decision 可能是:
        # - True: 批准所有
        # - False: 拒绝所有
        # - dict: {"tool_call_id": bool} 针对每个操作单独审批

        if decision is True:
            # 批准所有,继续执行工具
            return Command(goto="tools")
        elif decision is False:
            # 拒绝所有,返回拒绝消息
            return Command(
                goto="agent",
                update={"messages": [AIMessage(content="用户取消了操作。")]}
            )
        elif isinstance(decision, dict):
            # 部分批准 - 这里简化处理,实际可能需要更复杂的逻辑
            approved_ids = [k for k, v in decision.items() if v]
            if approved_ids:
                return Command(goto="tools")
            return Command(
                goto="agent",
                update={"messages": [AIMessage(content="用户取消了操作。")]}
            )

        return Command(goto="tools")

    def _build_graph(self, checkpointer, model: Optional[str] = None, system_prompt: Optional[str] = None):
        """构建带审批的工作流"""
        llm = self._get_llm(model)
        tool_node = ToolNode(self.all_tools)

        def call_model(state: MessagesState):
            messages = [SystemMessage(content=system_prompt or "你是一个有用的助手。")] + state["messages"]
            response = llm.invoke(messages)
            return {"messages": [response]}

        workflow = StateGraph(MessagesState)
        workflow.add_node("agent", call_model)
        workflow.add_node("tools", tool_node)
        workflow.add_node("approval", self._approval_node)

        workflow.add_edge(START, "agent")
        workflow.add_conditional_edges(
            "agent",
            self._should_continue,
            {"tools": "tools", "approval": "approval", END: END}
        )
        workflow.add_edge("tools", "agent")
        # 审批节点的条件跳转在节点内部通过 Command 实现

        return workflow.compile(checkpointer=checkpointer)

    def chat(self, thread_id: str, prompt: str, **kwargs) -> Dict[str, Any]:
        """对话(可能需要审批)"""
        from services.checkpointer import get_checkpointer

        with get_checkpointer() as checkpointer:
            app = self._build_graph(checkpointer)
            config = {"configurable": {"thread_id": thread_id}}

            result = app.invoke(
                {"messages": [HumanMessage(content=prompt)]},
                config
            )

            # 检查是否需要审批
            if "__interrupts__" in result:
                return {
                    "needs_approval": True,
                    "interrupts": result["__interrupts__"],
                    "thread_id": thread_id
                }

            return {
                "content": result["messages"][-1].content,
                "needs_approval": False
            }

    def resume(self, thread_id: str, decision: bool) -> Dict[str, Any]:
        """恢复中断的执行"""
        from services.checkpointer import get_checkpointer

        with get_checkpointer() as checkpointer:
            app = self._build_graph(checkpointer)
            config = {"configurable": {"thread_id": thread_id}}

            result = app.invoke(
                Command(resume=decision),
                config
            )

            return {"content": result["messages"][-1].content}


# 全局实例
_approval_agent: Optional[ApprovalEnabledAgent] = None


def get_approval_agent() -> ApprovalEnabledAgent:
    global _approval_agent
    if _approval_agent is None:
        _approval_agent = ApprovalEnabledAgent()
    return _approval_agent

四、API 集成

4.1 新增 API 端点

修改 api/chat.py:

python
from services.langgraph_approval import get_approval_agent

@router.post("/chat/approval")
async def chat_with_approval(
    data: ChatRequest,
    request: Request,
    db: Session = Depends(get_db)
):
    """可能需要审批的聊天"""
    session = get_session(request)
    agent = get_approval_agent()

    thread_id = data.conversation_id or str(uuid.uuid4())

    result = agent.chat(
        thread_id=thread_id,
        prompt=data.prompt,
        model=data.model,
        system_prompt=data.system_prompt
    )

    if result.get("needs_approval"):
        # 需要审批,返回中断信息
        return {
            "success": True,
            "needs_approval": True,
            "thread_id": thread_id,
            "interrupts": result["interrupts"]
        }

    return {
        "success": True,
        "needs_approval": False,
        "content": result["content"]
    }


@router.post("/chat/approval/resume")
async def resume_approval(
    thread_id: str = Form(...),
    approved: bool = Form(...),
    request: Request = None
):
    """恢复中断的执行"""
    agent = get_approval_agent()

    result = agent.resume(thread_id=thread_id, decision=approved)

    return {
        "success": True,
        "content": result["content"]
    }

五、前端集成

5.1 审批对话框组件

创建 static/js/components/approval-dialog.js:

javascript
/**
 * 审批对话框组件
 */
class ApprovalDialog {
    constructor() {
        this.dialog = null;
    }

    /**
     * 显示审批对话框
     * @param {Object} interrupts - 中断信息
     * @param {string} threadId - 线程 ID
     * @returns {Promise<boolean>} - 用户决定
     */
    show(interrupts, threadId) {
        return new Promise((resolve) => {
            // 创建对话框
            this.dialog = document.createElement('div');
            this.dialog.className = 'approval-dialog-overlay';
            this.dialog.innerHTML = `
                <div class="approval-dialog">
                    <div class="approval-header">
                        <h3>⚠️ 操作确认</h3>
                    </div>
                    <div class="approval-body">
                        ${this._renderRequests(interrupts)}
                    </div>
                    <div class="approval-footer">
                        <button class="btn-approve" id="approve-btn">✓ 批准</button>
                        <button class="btn-reject" id="reject-btn">✗ 拒绝</button>
                    </div>
                </div>
            `;

            document.body.appendChild(this.dialog);

            // 绑定事件
            document.getElementById('approve-btn').onclick = async () => {
                await this._submitDecision(threadId, true);
                resolve(true);
                this.close();
            };

            document.getElementById('reject-btn').onclick = async () => {
                await this._submitDecision(threadId, false);
                resolve(false);
                this.close();
            };
        });
    }

    _renderRequests(interrupts) {
        return interrupts.map(intr => {
            const req = intr.value.requests[0];
            return `
                <div class="approval-item">
                    <div class="approval-action">${this._getActionLabel(req.action)}</div>
                    <div class="approval-details">
                        ${this._renderDetails(req.action, req.args)}
                    </div>
                </div>
            `;
        }).join('');
    }

    _getActionLabel(action) {
        const labels = {
            'send_email': '📧 发送邮件',
            'delete_conversation': '🗑️ 删除对话',
            'update_user_info': '✏️ 更新信息'
        };
        return labels[action] || action;
    }

    _renderDetails(action, args) {
        switch (action) {
            case 'send_email':
                return `
                    <p><strong>收件人:</strong> ${args.to}</p>
                    <p><strong>主题:</strong> ${args.subject}</p>
                    <p><strong>内容预览:</strong> ${args.body}</p>
                `;
            case 'delete_conversation':
                return `<p><strong>对话 ID:</strong> ${args.conversation_id}</p>`;
            case 'update_user_info':
                return `
                    <p><strong>修改字段:</strong> ${args.field}</p>
                    <p><strong>新值:</strong> ${args.new_value}</p>
                `;
            default:
                return `<pre>${JSON.stringify(args, null, 2)}</pre>`;
        }
    }

    async _submitDecision(threadId, approved) {
        try {
            const response = await fetch('/api/chat/approval/resume', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({thread_id: threadId, approved: approved})
            });
            const result = await response.json();

            if (result.success) {
                // 继续显示 AI 响应
                appendMessage('assistant', result.content);
            }
        } catch (error) {
            console.error('提交审批决定失败:', error);
        }
    }

    close() {
        if (this.dialog) {
            this.dialog.remove();
            this.dialog = null;
        }
    }
}

// 全局实例
const approvalDialog = new ApprovalDialog();

5.2 CSS 样式

添加到 static/css/index.css:

css
/* 审批对话框样式 */
.approval-dialog-overlay {
    position: fixed;
    top: 0;
    left: 0;
    right: 0;
    bottom: 0;
    background: rgba(0, 0, 0, 0.5);
    display: flex;
    align-items: center;
    justify-content: center;
    z-index: 1000;
}

.approval-dialog {
    background: white;
    border-radius: 12px;
    max-width: 500px;
    width: 90%;
    box-shadow: 0 4px 20px rgba(0, 0, 0, 0.15);
}

.approval-header {
    padding: 20px;
    border-bottom: 1px solid #eee;
}

.approval-header h3 {
    margin: 0;
    color: #ff9800;
}

.approval-body {
    padding: 20px;
    max-height: 300px;
    overflow-y: auto;
}

.approval-item {
    background: #f5f5f5;
    padding: 15px;
    border-radius: 8px;
    margin-bottom: 10px;
}

.approval-action {
    font-weight: bold;
    margin-bottom: 10px;
}

.approval-details p {
    margin: 5px 0;
    font-size: 14px;
}

.approval-footer {
    padding: 20px;
    border-top: 1px solid #eee;
    display: flex;
    gap: 10px;
    justify-content: flex-end;
}

.btn-approve, .btn-reject {
    padding: 10px 20px;
    border: none;
    border-radius: 6px;
    cursor: pointer;
    font-size: 14px;
    transition: all 0.2s;
}

.btn-approve {
    background: #4caf50;
    color: white;
}

.btn-approve:hover {
    background: #43a047;
}

.btn-reject {
    background: #f44336;
    color: white;
}

.btn-reject:hover {
    background: #e53935;
}

六、测试计划

6.1 单元测试

python
# tests/test_approval.py
import pytest
from services.langgraph_approval import ApprovalEnabledAgent


def test_should_continue_no_tool_calls():
    """测试无工具调用时直接结束"""
    agent = ApprovalEnabledAgent()

    from langchain_core.messages import AIMessage
    state = {"messages": [AIMessage(content="你好")]}

    result = agent._should_continue(state)
    assert result == END


def test_should_continue_normal_tool():
    """测试普通工具调用"""
    agent = ApprovalEnabledAgent()

    from langchain_core.messages import AIMessage
    msg = AIMessage(content="")
    msg.tool_calls = [{"name": "search_web", "args": {"query": "test"}}]

    state = {"messages": [msg]}
    result = agent._should_continue(state)
    assert result == "tools"


def test_should_continue_approval_tool():
    """测试需要审批的工具调用"""
    agent = ApprovalEnabledAgent()

    from langchain_core.messages import AIMessage
    msg = AIMessage(content="")
    msg.tool_calls = [{"name": "send_email", "args": {"to": "test@example.com"}}]

    state = {"messages": [msg]}
    result = agent._should_continue(state)
    assert result == "approval"

七、实施步骤

步骤 1: 创建审批工具(0.5 天)

  • 创建 services/tools/approval.py
  • 定义需要审批的工具
  • 编写单元测试

步骤 2: 创建审批 Agent(0.5 天)

  • 创建 services/langgraph_approval.py
  • 实现 _approval_node 中断节点
  • 实现 resume 方法

步骤 3: API 集成(0.25 天)

  • 添加 /api/chat/approval 端点
  • 添加 /api/chat/approval/resume 端点

步骤 4: 前端集成(0.5 天)

  • 创建审批对话框组件
  • 添加 CSS 样式
  • 集成到聊天流程

步骤 5: 测试(0.25 天)

  • 端到端测试
  • 修复问题

八、相关文档